001    /*
002     * LSFDispatcher.java
003     *
004     * Created on December 23, 2002, 11:30 AM
005     *
006     * This file is part of the STAR Scheduler.
007     * Copyright (c) 2002-2003 STAR Collaboration - Brookhaven National Laboratory
008     *
009     * STAR Scheduler is free software; you can redistribute it and/or modify
010     * it under the terms of the GNU General Public License as published by
011     * the Free Software Foundation; either version 2 of the License, or
012     * (at your option) any later version.
013     *
014     * STAR Scheduler is distributed in the hope that it will be useful,
015     * but WITHOUT ANY WARRANTY; without even the implied warranty of
016     * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
017     * GNU General Public License for more details.
018     *
019     * You should have received a copy of the GNU General Public License
020     * along with STAR Scheduler; if not, write to the Free Software
021     * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
022     */
023    package gov.bnl.star.offline.scheduler.lsf;
024    
025    import gov.bnl.star.offline.scheduler.ComponentLibrary;
026    import gov.bnl.star.offline.scheduler.Dispatcher;
027    import gov.bnl.star.offline.scheduler.Job;
028    import gov.bnl.star.offline.scheduler.Request;
029    import gov.bnl.star.offline.scheduler.catalog.PhysicalFile;
030    import gov.bnl.star.offline.scheduler.util.CSHCommandLineTask;
031    //import gov.bnl.star.offline.scheduler.util.StatisticsRecorder; //Moved Statistics recording to Scheduler.java LH
032    import gov.bnl.star.offline.scheduler.util.GenericResourceRequirementStringDefinition;
033    
034    
035    import java.util.List;
036    import java.util.logging.Level;
037    import java.util.logging.Logger;
038    
039    
040    
041    /** Dispatches a job using LSF.
042     * <p>
043     * For each process, two files are created: a script for the execution and
044     * a text file containing the file list. The script basically sets the
045     * environment variables and executes the command line. The file list
046     * contains the input file requested, one full path for each line in the list.
047     * <p>
048     * Each script is submitted through bsub.
049     * <p>
050     * The simulation flag will make the scheduler not actually execute the command
051     * lines. Therefore scripts and fileLists are created, but the bsub and chmod
052     * commands are not executed. Log and output won't be affected, except that there
053     * will be a message warning that the submission is simulated.
054     *
055     * @author Gabriele Carcassi, Jerome Lauret
056     * @version 1.0 2002/12/26
057     */
058    public class LSFDispatcher implements Dispatcher {
059        static private Logger log = Logger.getLogger(LSFDispatcher.class.getName());
060        private String resourceStrategy;
061        protected String scratchDir;
062        private String bsubEx;
063        protected boolean simulation = false;
064        private String queueName;
065        private String bsubOptions;
066        private int maxBsubAttempts;
067        private int msBtwnSuccess;
068        private int msBtwnFailure;
069        private String ResReqDefinitionObj;
070        
071        public void setResourceRequirementStringDefinition(String ResReqDefinitionObj){
072            this.ResReqDefinitionObj = ResReqDefinitionObj;
073            
074        }
075        
076        public String getScratchDir() {
077            return scratchDir;
078        }
079        
080        public void setScratchDir(String scratchDir) {
081            this.scratchDir = scratchDir;
082        }
083        
084        public String getBsubEx() {
085            return bsubEx;
086        }
087        
088        public void setBsubEx(String bsubEx) {
089            this.bsubEx = bsubEx;
090        }
091    
092        public String getQueueName() {
093            return queueName;
094        }
095        
096        public void setQueueName(String queueName) {
097            this.queueName = queueName;
098        }
099        
100        public String getBsubOptions() {
101            return bsubOptions;
102        }
103        
104        public void setBsubOptions(String bsubOptions) {
105            this.bsubOptions = bsubOptions;
106        }
107        
108        public int getMaxAttempts() {
109            return maxBsubAttempts;
110        }
111        
112        public void setMaxAttempts(int maxAttempts) {
113            this.maxBsubAttempts = maxAttempts;
114        }
115        
116        public int getMsBtwnSuccess() {
117            return msBtwnSuccess;
118        }
119        
120        public void setMsBtwnSuccess(int msBtwnSuccess) {
121            this.msBtwnSuccess = msBtwnSuccess;
122        }
123        
124        public int getMsBtwnFailure() {
125            return msBtwnFailure;
126        }
127        
128        public void setMsBtwnFailure(int msBtwnFailure) {
129            this.msBtwnFailure = msBtwnFailure;
130        }
131        
132        protected boolean reportedFailure;
133        protected CSHApplication application;
134        private String resSwitch;
135    
136        /** Creates the scripts and dispatches the job on the target machine.
137         * @param request the job request
138         */
139        public void dispatch(Request request, List jobs) {
140            log.info("Dispatching using LSF: \"" + request.getCommand() + "\"");
141    
142            // Enables the simulation mode if necessary
143            useSimulationMode(request.getSimulation());
144            reportedFailure = false;
145    
146            // Submits from the higher to the lower JobID. This way the
147            // user has a feel of  when the last job is going to be
148            // submitted
149            for (int nJob = jobs.size() - 1; nJob >= 0;
150                    nJob--) {
151                Job job = (Job) jobs.get(nJob);
152    
153                System.out.print("Dispatching process " +
154                    job.getJobID() + ".");
155                dispatch(request, job);
156                if (getClusterName() != null) job.setCluster(getClusterName());
157            }
158    
159            //StatisticsRecorder.getIntance().recordStatistics(request, jobs); //Moved Statistics recording to Scheduler.java LH
160        }
161    
162        /* Enables or disables the simulation mode. The simulation mode will deactivate
163         * every command line execution.
164         */
165        protected void useSimulationMode(boolean simulation) {
166            this.simulation = simulation;
167    
168            if (simulation) {
169                // Warn the user that we are entering simulated submission mode
170                log.warning("Simulating submission");
171                System.out.println("Simulating submission");
172            }
173        }
174    
175        protected void reportProcessSubmissionFailure(Request request, Job job, int jobNumber, String message) {
176            reportFailure(job);
177            System.out.println("Process number " + jobNumber + " wasn't submitted.");
178            System.out.println(message);
179            System.out.println();
180            System.out.println("The process input file were:");
181    
182            List list = job.getInput();
183    
184            for (int nFile = 0; nFile < list.size(); nFile++) {
185                PhysicalFile file = (PhysicalFile) list.get(nFile);
186                System.out.println(" - " + file.getPath() + "/" +
187                    file.getFilename());
188            }
189        }
190    
191        protected void reportFailure(Job job) {
192            if (!reportedFailure) {
193                System.out.println("There were some errors during job submission.");
194                System.out.println("Some processes weren't submitted:");
195            }
196        }
197    
198        /** Currently not implemented
199         * @param request the job for which to retrieve the output
200         */
201        public void retrieveOutput(Request request, List jobs) {
202        }
203    
204        /* Dispatches a single process of a job request.
205         */
206        protected void dispatch(Request request, Job job) {
207            application = (CSHApplication) ComponentLibrary.getInstance().getComponent("CSHApplication");
208            
209            // TODO: all the parameters should be passed in one go
210            application.setJob(request, job);
211            application.setScratchDir(scratchDir);
212            application.setSubmissionCommand(getBsubCommand(request, job));
213    
214            application.prepareJob();
215    
216            log.info("Executing \"" + getBsubCommand(request, job) + "\"");
217    
218            if (!simulation) {
219                try {
220                    Thread.sleep(msBtwnSuccess);
221                } catch (Exception e) {
222                }
223    
224                int attempt = 0;
225                boolean success = false;
226                String pe="";
227    
228                while (!success && (attempt < maxBsubAttempts)) {
229                    try {
230                        CSHCommandLineTask task = new CSHCommandLineTask(getBsubCommand(
231                                    request, job), true, 30000);
232                        task.execute();
233    
234                        if (task.getExitStatus() != 0) {
235                            log.warning("bsub failed: " + task.getOutput());
236                            Thread.sleep(msBtwnFailure);
237                            System.out.print("/");
238                            attempt++;
239                        } else {
240                            success = true;
241                        }
242                    } catch (Exception e) {
243                        log.log(Level.SEVERE, "Couldn't submit the script to LSF", e);
244                        // JL: write also to STDOUT to help users debug what happened
245                        if (pe == ""){
246                           pe = e.toString();
247                           System.out.print("Couldn't submit the script to LSF" + pe);
248                        }
249                        try {
250                            Thread.sleep(msBtwnFailure);
251                        } catch (Exception e1) {
252                        }
253    
254                        System.out.print("/");
255                        attempt++;
256                    }
257                }
258    
259                if (success) {
260                    System.out.println(" done.");
261                } else {
262                    System.out.println(" FAILED!!");
263                }
264            } else {
265                System.out.println(" simulated.");
266            }
267        }
268    
269        /** Returns the full bsub command to be executed to dispatch the process. This
270        * command must executed in the directory in which the script resides.
271        * @return the bsub command
272        */
273        String getBsubCommand(Request request, Job job) {
274            StringBuffer bsub = new StringBuffer(bsubEx);
275            bsub.append(" -q ").append(getQueueName(job));
276    
277            if (job.getTarget() != null) {
278                bsub.append(" -m ").append(job.getTarget());
279            }
280    
281            if (application.getJobName() != null) {
282                bsub.append(" -J '").append(application.getJobName()).append("'");
283            }
284    
285            if (application.getStdin() != null) {
286                bsub.append(" -i ").append(application.getStdin());
287            }
288    
289            if (application.getStdout() != null) {
290                bsub.append(" -o ").append(application.getStdout());
291            }
292    
293            if (application.getStderr() != null) {
294                bsub.append(" -e ").append(application.getStderr());
295            }
296    
297           
298            
299            //This takes the GenericResourceRequirementStringDefinition (if any) and adds the ruage[] to it if the is any then it write out the string
300            GenericResourceRequirementStringDefinition lsfResReqDef = new GenericResourceRequirementStringDefinition();
301            
302            
303            if(ResReqDefinitionObj != null)
304            lsfResReqDef = (GenericResourceRequirementStringDefinition) ComponentLibrary.getInstance().getComponent(ResReqDefinitionObj);
305            
306           
307            if ((getResourceUsageSwitch(job) != null)&&( lsfResReqDef.hasResourcesDefinition(job))) {
308                
309           String Res = "\"(" + lsfResReqDef.makeString(job).replaceAll("\"", "").concat(") ").concat(getResourceUsageSwitch(job).replaceAll("\"", "")).concat("\"");
310            bsub.append(" -R ").append(Res);
311            }
312            else if(getResourceUsageSwitch(job) != null){
313                bsub.append(" -R ").append(getResourceUsageSwitch(job));
314            }
315            else if( lsfResReqDef.hasResourcesDefinition(job)){
316                bsub.append(" -R ").append(lsfResReqDef.makeString(job));
317            }
318            //////////////////////////lbh
319       /*     
320            
321            if (getResourceUsageSwitch(job) != null) {
322                bsub.append(" -R ").append(getResourceUsageSwitch(job));
323            }
324      */  
325            bsub.append(' ').append(bsubOptions);
326    
327    
328            bsub.append(' ').append(application.getCommandLine());
329    
330            return bsub.toString();
331        }
332    
333        
334        private LSFResourceStrategy lsfResourceStrategy;
335        
336        /** Holds value of property clusterName. */
337        private String clusterName;
338        
339        public void setResourceStrategy(LSFResourceStrategy resourceStrategy) {
340            this.lsfResourceStrategy = resourceStrategy;
341        }
342        
343        public LSFResourceStrategy getResourceStrategy() {
344            return lsfResourceStrategy;
345        }
346    
347        protected String getResourceUsageSwitch(Job job) {
348            //FIXME: cache value
349            if (getResourceStrategy() == null) {
350                return null;
351            }
352    
353            resSwitch = getResourceStrategy().prepareResourceUsageSwitch(job);
354    
355            return resSwitch;
356        }
357    
358        protected String getQueueName(Job job) {
359            String queue = job.getQueue();
360    
361            if (queue == null) {
362                return queueName;
363            }
364    
365            return queue;
366        }
367        
368        /** Getter for property clusterName.
369         * @return Value of property clusterName.
370         *
371         */
372        public String getClusterName() {
373            return this.clusterName;
374        }
375        
376        /** Setter for property clusterName.
377         * @param clusterName New value of property clusterName.
378         *
379         */
380        public void setClusterName(String clusterName) {
381            this.clusterName = clusterName;
382        }
383        
384        public void Kill(Request request, List jobs) {
385        }    
386        
387        public String Status(Job job, int Processe) {
388            return "status unavailable";
389        }
390        
391    }